{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Testing SparkMonitor Extension"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The configuration object `SparkConf` is provided by the extension, added to the namespace as '`conf`'.  \n",
    "The user passes this to the SparkContext \n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "ExecuteTime": {
     "end_time": "2019-10-03T20:28:41.278758Z",
     "start_time": "2019-10-03T20:28:41.272238Z"
    }
   },
   "outputs": [],
   "source": [
    "print(conf.toDebugString()) #Instance of SparkConf with options set by the extension"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "User adds other options and starts the spark context"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "ExecuteTime": {
     "end_time": "2019-10-03T20:28:47.578343Z",
     "start_time": "2019-10-03T20:28:43.907033Z"
    }
   },
   "outputs": [],
   "source": [
    "conf.setAppName('ExtensionTestingApp')\n",
    "#conf.setMaster('spark://localhost:7077') # if master is started using command line\n",
    "conf.setMaster('local[*]')\n",
    "from pyspark import SparkContext\n",
    "sc=SparkContext.getOrCreate(conf=conf) #Start the spark context"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Example spark job"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "ExecuteTime": {
     "end_time": "2019-10-03T20:29:03.773530Z",
     "start_time": "2019-10-03T20:28:50.498890Z"
    }
   },
   "outputs": [],
   "source": [
    "import time\n",
    "b=sc.broadcast([3,5]) #Creating a broadcast variable available on all executors\n",
    "a=sc.accumulator(0)   #Creating an accumulator for adding values across executors\n",
    "RDD0=sc.parallelize([y for y in range(0,5)]) #RDD from input python collection\n",
    "RDD2=sc.parallelize([z for z in range(10,15)])\n",
    "RDD1=RDD0.cartesian(RDD2) \n",
    "cached=RDD2.cache() #Testing cached RDD\n",
    "RDD22=RDD1.map(lambda x:x[0]+x[1]+b.value[0])\n",
    "RDD3=RDD22.repartition(5) # To trigger a new stage.\n",
    "RDD4=RDD2.map(lambda x: 3*x-b.value[0])\n",
    "RDD5=RDD3.filter(lambda x:x%2==0)\n",
    "RDD6=RDD4.filter(lambda x:x%2!=0)\n",
    "RDD7=RDD5.cartesian(RDD6)\n",
    "RDD8=RDD7.flatMap(lambda x: [x[i] for i in range(0,2)])\n",
    "RDD9=RDD8.union(cached)\n",
    "ans=RDD9.reduce(lambda x,y: x+y) # Doing a simple sum on the random data.\n",
    "print(ans)\n",
    "def f(x):\n",
    "    global a\n",
    "    time.sleep(0.5) #Making the job run a little longer\n",
    "    a+=x\n",
    "RDD9.foreach(f)\n",
    "print(a.value)\n",
    "#Display should appear automatically"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "ExecuteTime": {
     "end_time": "2019-10-03T20:29:05.551425Z",
     "start_time": "2019-10-03T20:29:04.699019Z"
    }
   },
   "outputs": [],
   "source": [
    "sc.parallelize(range(0,100)).count()\n",
    "sc.parallelize(range(0,100)).count()\n",
    "sc.parallelize(range(0,100)).count()\n",
    "sc.parallelize(range(0,100)).count()\n",
    "sc.parallelize(range(0,100)).count()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "ExecuteTime": {
     "end_time": "2019-10-03T20:29:08.505454Z",
     "start_time": "2019-10-03T20:29:07.835796Z"
    }
   },
   "outputs": [],
   "source": [
    "sc.parallelize(range(0,100)).map(lambda x:x*x).filter(lambda x:x%2==0).count()\n",
    "sc.parallelize(range(0,100)).map(lambda x:x*x).filter(lambda x:x%2==0).count()\n",
    "sc.parallelize(range(0,100)).map(lambda x:x*x).filter(lambda x:x%2==0).count()\n",
    "sc.parallelize(range(0,100)).map(lambda x:x*x).filter(lambda x:x%2==0).count()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "ExecuteTime": {
     "end_time": "2019-10-03T20:30:42.643996Z",
     "start_time": "2019-10-03T20:30:42.640573Z"
    }
   },
   "outputs": [],
   "source": [
    "sc.stop()"
   ]
  }
 ],
 "metadata": {
  "anaconda-cloud": {},
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.9"
  },
  "toc": {
   "base_numbering": 1,
   "nav_menu": {},
   "number_sections": true,
   "sideBar": true,
   "skip_h1_title": false,
   "title_cell": "Table of Contents",
   "title_sidebar": "Contents",
   "toc_cell": false,
   "toc_position": {},
   "toc_section_display": true,
   "toc_window_display": false
  },
  "varInspector": {
   "cols": {
    "lenName": 16,
    "lenType": 16,
    "lenVar": 40
   },
   "kernels_config": {
    "python": {
     "delete_cmd_postfix": "",
     "delete_cmd_prefix": "del ",
     "library": "var_list.py",
     "varRefreshCmd": "print(var_dic_list())"
    },
    "r": {
     "delete_cmd_postfix": ") ",
     "delete_cmd_prefix": "rm(",
     "library": "var_list.r",
     "varRefreshCmd": "cat(var_dic_list()) "
    }
   },
   "types_to_exclude": [
    "module",
    "function",
    "builtin_function_or_method",
    "instance",
    "_Feature"
   ],
   "window_display": false
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
